1 | func main() { |
上一篇我们分析了Dial建立连接的流程。现在继续往下分析。
1 | client := server_hello_proto.NewTigerServiceClient(conn) |
这里是使用pb文件的代码去新建一个连接client,其实也就是使用一个结构体类型(TigerServiceClient)的值保存clientConn连接属性。1
2
3type tigerServiceClient struct {
cc *grpc.ClientConn
}
这个TigerServiceClient类型实现了TigerServiceClient接口,也就是实现了根据proto定义的rppc service 接口编译自动生成的客户端stub api接口。1
2
3
4type TigerServiceClient interface {
HelloTiger(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
...
}
HelloTiger这个方法入参有三个,查看grpc.CallOption这个参数类型,这是一个接口类型1
2
3
4
5
6
7
8
9
10
11// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
// before is called before the call is sent to any server. If before
// returns a non-nil error, the RPC fails with that error.
before(*callInfo) error
// after is called after the call has completed. after cannot return an
// error, so any failures should be reported via output parameters.
after(*callInfo)
}
这个接口类型有before和after方法,我们可以实现这个接口,在RPC方法调用前后会调用before和after方法去执行我们的实现逻辑。
然后我们再看客户端stub api的方法实现1
2
3
4
5
6
7
8func (c *tigerServiceClient) HelloTiger(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) {
out := new(HelloResponse)
err := c.cc.Invoke(ctx, "/server_hello_proto.TigerService/HelloTiger", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
这里的核心是Invoke调用,也就是本篇分析的重点。
1 |
|
invoke函数声明1
2
3
4
5
6
7
8
9
10
11
12
13func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
//获取传输层 Trasport 并组合封装到 ClientStream 中返回,在这块会涉及负载均衡、超时控制、 Encoding、 Stream 的动作,与服务端基本一致的行为。
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
//发送请求
if err := cs.SendMsg(req); err != nil {
return err
}
//返回响应
return cs.RecvMsg(reply)
}
创建客户端流对象时,会循环调用callOption的before方法,做一些发送请求前的处理1
2
3
4
5for _, o := range opts {
if err := o.before(c); err != nil {
return nil, toRPCErr(err)
}
}
发送请求SendMsg方法声明1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (cs *clientStream) SendMsg(m interface{}) (err error) {
defer func() {
if err != nil && err != io.EOF {
// Call finish on the client stream for errors generated by this SendMsg
// call, as these indicate problems created by this client. (Transport
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
// error will be returned from RecvMsg eventually in that case, or be
// retried.)
cs.finish(err)
}
}()
// 判断客户端流是否已关闭
if cs.sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
// 判断当前流是否是客户端流
if !cs.desc.ClientStreams {
cs.sentLast = true
}
//对请求信息预处理,序列化、压缩,生成
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}
// TODO(dfawley): should we be checking len(data) instead?
//判断压缩+序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
}
msgBytes := data // Store the pointer before setting to nil. For binary logging.
// 创建发送消息的函数
op := func(a *csAttempt) error {
//真正发送数据的地方
err := a.sendMsg(m, hdr, payload, data)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
return err
}
// 开始发送(带重试机制)
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{
OnClientSide: true,
Message: msgBytes,
})
}
return
}
客户端流对象接收响应信息方法声明1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.binlog != nil && !cs.serverHeaderBinlogged {
// Call Header() to binary log header if it's not already logged.
cs.Header()
}
var recvInfo *payloadInfo
if cs.binlog != nil {
recvInfo = &payloadInfo{}
}
//接收服务端结果,并且反序列化,填充到m对象上,m就是返回值
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ServerMessage{
OnClientSide: true,
Message: recvInfo.uncompressedBytes,
})
}
if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
cs.finish(err)
if cs.binlog != nil {
// finish will not log Trailer. Log Trailer here.
logEntry := &binarylog.ServerTrailer{
OnClientSide: true,
Trailer: cs.Trailer(),
Err: err,
}
if logEntry.Err == io.EOF {
logEntry.Err = nil
}
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
cs.binlog.Log(logEntry)
}
}
return err
}